-
Notifications
You must be signed in to change notification settings - Fork 0
Mirror upstream elastic/elasticsearch#136781 for AI review (snapshot of HEAD tree) #280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
BASE=9770442d77ceac0dda6f6e6a07d0d386885ab52a HEAD=955fbccc5328132bd5bcbb95bab60ca5b355e8ca Branch=main
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR improves data loading performance by introducing parallel execution for creating indices, loading data, and setting up enrich policies. It also updates test code to handle exceptions properly and adds concurrent request handling in multi-cluster tests.
Key changes:
- Parallelized data loading operations using a semaphore-based approach with 10 concurrent threads
- Refactored data loading to separate index creation from data ingestion
- Added asynchronous request execution in multi-cluster tests using CompletableFuture
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| CsvTestsDataLoader.java | Introduced parallel execution for index creation, data loading, and enrich policy setup; split monolithic load method into createIndex and loadData |
| CsvTestsDataLoaderTests.java | Updated test to expect AssertionError wrapping ConnectException due to parallel execution framework |
| MultiClusterSpecIT.java | Added runInParallel method to execute requests concurrently on local and remote clients using async APIs |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) { | ||
| Semaphore semaphore = new Semaphore(PARALLEL_THREADS); | ||
| ESTestCase.runInParallel(items.size(), i -> { | ||
| try { | ||
| semaphore.acquire(); | ||
| consumer.accept(items.get(i)); | ||
| } catch (IOException | InterruptedException e) { | ||
| throw new RuntimeException(errorMessage, e); | ||
| } finally { | ||
| semaphore.release(); | ||
| } | ||
| }); | ||
| } |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semaphore acquisition is happening inside the parallel tasks, which defeats its purpose as a concurrency limiter. Since all tasks start simultaneously via runInParallel, they will all attempt to acquire the semaphore at once, causing most threads to block immediately.
The semaphore should limit the number of concurrently executing tasks, but the current implementation doesn't achieve this effectively because:
- All tasks are spawned upfront by
runInParallel(items.size(), ...) - Each task then tries to acquire the semaphore
A better approach would be to use a thread pool with a fixed size (e.g., ExecutorService with PARALLEL_THREADS threads) or restructure the code to submit tasks to the semaphore-controlled executor rather than spawning all threads first.
| Response remote = remoteResponse.get(); | ||
| Response local = localResponse.get(); | ||
| assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode()); | ||
| return local; |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runInParallel method doesn't handle InterruptedException properly when calling CompletableFuture.get(). If the thread is interrupted while waiting for the futures, the InterruptedException will be caught by the ExecutionException catch block and its cause will be thrown, losing the interrupt status.
Add a specific catch block for InterruptedException that restores the interrupt status before rethrowing:
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (ExecutionException e) {
throw e.getCause();
}| return local; | |
| return local; | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw e; |
Some recent test performance updates flushed out test fragility due to non-thread safety and other concurrency issues with test setup, and so two optimizations were reverted. A recent PR at elastic#136780 added improved resiliency to concurrency issues in test setup which should allow us to bring back the performance optimizations.
The resiliency fix included in elastic#136780 was originally extracted from the Views PR at elastic#134995, which, due to early issues in view creation/deleting during test setup was reliably exposing the same errors seen in elastic#134736, and so it seems likely it will also fix the issues seen in the optimizations.
Checklist:
Fixes elastic#134890